package run

import (
	"context"
	"fmt"
	. "gitee.com/youbeiwuhuan/go-xxljob-executor/biz/model"
	. "gitee.com/youbeiwuhuan/go-xxljob-executor/global"
	"gitee.com/youbeiwuhuan/go-xxljob-executor/handler"
	"gitee.com/youbeiwuhuan/go-xxljob-executor/proccess"
	"gitee.com/youbeiwuhuan/go-xxljob-executor/tools"
	"go.uber.org/zap"
	"runtime/debug"
	"strconv"
	"sync"
	"time"
)

type JobRunner struct {
	running   bool
	idleTimes int

	jobId        int32
	jobHandler   handler.IJobHandler
	triggerQueue chan TriggerParam
	queueSize    int

	cancel       context.CancelFunc
	runStatusMap sync.Map // logId -> status,用于后期扩展后台实时刷新任务运行状态

	rtCh chan ReturnT

	triggerCallback *proccess.TriggerCallbackProccessor
}

func NewJobRunner(jobId int32, jobHandler handler.IJobHandler, triggerCallback *proccess.TriggerCallbackProccessor) *JobRunner {
	jr := &JobRunner{
		running:   false,
		idleTimes: 0,

		jobId:        jobId,
		jobHandler:   jobHandler,
		triggerQueue: make(chan TriggerParam, DEFAULT_JOB_QUEUE_SIZE),
		rtCh:         make(chan ReturnT),

		runStatusMap: sync.Map{},

		triggerCallback: triggerCallback,
	}

	return jr
}

func (t *JobRunner) Start() {

	ctx, cancle := context.WithCancel(context.Background())
	t.cancel = cancle

	go func() {

		defer func() {
			//清理未执行的

			t.cleanQueue()

		}()

	LOOP_TRIGGER:
		for {

			select {
			case triggerParam := <-t.triggerQueue:
				Logger.Info(">>>>>>" + triggerParam.String())
				t.Run(triggerParam)
			case <-ctx.Done():
				Logger.Info(">>>>>stop")
				break LOOP_TRIGGER
			default:
				//t.idleTimes = t.idleTimes + 1
				//// 空转超过30次则停止，回收JOBRunner
				//if t.idleTimes > MAX_IDLE_TIMES {
				//	break LOOP_TRIGGER
				//}
			}

		}

	}()

	t.running = true

}

func (t *JobRunner) Stop() {
	defer func() {
		close(t.triggerQueue)
	}()

	t.cancel()
	t.running = false
}

func (t *JobRunner) IsRunningOrHasQueue() bool {
	return t.running || t.queueSize > 0
}

func (t *JobRunner) PushTriggerQueue(triggerParam TriggerParam) {
	Logger.Info("---------------" + triggerParam.String())
	t.triggerQueue <- triggerParam
	t.queueSize = t.queueSize + 1
	t.setRunStatus(triggerParam.LogId, TASK_STATUS_INQUEUE)
}

func (t *JobRunner) setRunStatus(logId int64, status string) {
	t.runStatusMap.Store(logId, status)
}

func (t *JobRunner) Run(triggerParam TriggerParam) {

	go func() {

		t.setRunStatus(triggerParam.LogId, TASK_STATUS_RUNNING)
		xxlLogger := tools.NewXxlJobFileAppender(triggerParam.LogId, time.Unix(triggerParam.LogDateTime/1000, 0))

		defer func() {
			if r := recover(); r != nil {
				Logger.Error("Run pannic ,", zap.Any("err", r))

				xxlLogger.Log(fmt.Sprintf("<br>----------- Run panic : %v  <br>----------- xxl-job job execute end(error) -----------", string(debug.Stack())))

				t.rtCh <- ReturnT{Code: FAIL_CODE, Msg: "Run pannic"}
			}
		}()

		context := handler.JobContext{
			JobId: triggerParam.JobId,

			ExecutorHandler:       triggerParam.ExecutorHandler,
			ExecutorParams:        triggerParam.ExecutorParams,
			ExecutorBlockStrategy: triggerParam.ExecutorBlockStrategy,
			ExecutorTimeout:       triggerParam.ExecutorTimeout,

			LogId:       triggerParam.LogId,
			LogDateTime: triggerParam.LogDateTime,

			//仅仅支持 bean 类型
			GlueType: triggerParam.GlueType,

			BroadcastIndex: triggerParam.BroadcastIndex,
			BroadcastTotal: triggerParam.BroadcastTotal,
		}

		xxlLogger.Log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.ExecutorParams)

		t.jobHandler.Init(context, xxlLogger)
		rt := t.jobHandler.Execute(triggerParam.ExecutorParams, context, xxlLogger)
		t.jobHandler.Destroy(context, xxlLogger)

		xxlLogger.Log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode=" + strconv.Itoa(int(rt.Code)) + ", handleMsg = " + rt.Msg)

		t.rtCh <- rt
	}()

	Logger.Info(fmt.Sprintf("========================%d", triggerParam.ExecutorTimeout))
	if triggerParam.ExecutorTimeout <= 0 {
		rt0 := <-t.rtCh
		t.handleCallback(rt0, triggerParam)
	} else {
		select {
		case rt0 := <-t.rtCh:
			t.handleCallback(rt0, triggerParam)
		case <-time.After(time.Second * time.Duration(triggerParam.ExecutorTimeout)):
			t.setRunStatus(triggerParam.LogId, TASK_STATUS_RUN_COMPLETE)

			t.triggerCallback.PushTriggerCallback(HandleCallbackParam{
				LogId:      triggerParam.LogId,
				LogDateTim: int64(time.Now().Nanosecond() / 1000),

				HandleCode: HANDLE_COCE_TIMEOUT,
				HandleMsg:  "job execute timeout ",
			})
		}
	}

}

func (t *JobRunner) handleCallback(r ReturnT, triggerParam TriggerParam) {
	//TODO 表示要异步上报结果 留给下一个tag实现
	//if UNKONWN == rt.Code {
	//
	//}

	t.setRunStatus(triggerParam.LogId, TASK_STATUS_RUN_COMPLETE)

	t.triggerCallback.PushTriggerCallback(HandleCallbackParam{
		LogId:      triggerParam.LogId,
		LogDateTim: int64(time.Now().Nanosecond() / 1000),

		HandleCode: r.Code,
		HandleMsg:  r.Msg,
	})
}

func (t *JobRunner) cleanQueue() {

	for triggerParam, ok := <-t.triggerQueue; !ok; {
		t.triggerCallback.PushTriggerCallback(HandleCallbackParam{
			LogId:      triggerParam.LogId,
			LogDateTim: int64(time.Now().Nanosecond() / 1000),

			HandleCode: FAIL_CODE,
			HandleMsg:  "job is killed!",
		})
	}

}
